#### Import dependencies ####

In [7]:
import os
import sys
import time
import warnings
import math
import csv
import h5py
import string
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.patches as mpatches
import matplotlib.colors as mcolors
from matplotlib.colors import ListedColormap
from matplotlib.ticker import LogFormatterMathtext
from mpl_scatter_density import ScatterDensityArtist
import pyideogram
import pybigtools
import scipy.optimize
import scipy.io
from scipy.ndimage import gaussian_filter
from scipy.optimize import curve_fit, fsolve
from numpy import genfromtxt
from typing import Any
from time import monotonic
import cProfile
from random import random
from itertools import accumulate
from math import floor
import gzip
from Bio import SeqIO

# General options
np.set_printoptions(threshold=sys.maxsize)

# Suppress specific warnings
warnings.filterwarnings("ignore", message="The iteration is not making good progress")
warnings.filterwarnings("ignore", message=".*Creating legend with loc=\"best\" can be slow with large amounts of data.*")
warnings.filterwarnings("ignore", message=".*All-NaN slice encountered.*", category=RuntimeWarning)

#### General variables ####

In [31]:
cell_lines = ["HeLa-S3","BJ1","IMR90","HUVEC","K562","GM12878","HepG2","MCF-7","H1","H9","HCT"]
chr_lengths = [249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365, 141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355, 81196, 78078, 59129, 63026, 48130, 51305]

list1 = ["time_data", "time_sim", "error", "fire_rates", "forkd","telomeres"]
list2 = ["Replication time (min)", "Replication time (min)", "Error", "Firing rate", "Fork directionality", "Telomeres"]
title_map = {key: value for key, value in zip(list1, list2)}

#### Data generation ####

In [32]:
### BigWig data ###
# From: https://genome.ucsc.edu/cgi-bin/hgFileUi?db=hg19&g=wgEncodeUwRepliSeq

def sigmoid(values, k):
    values = np.array(values)  
    if k > 0:
        return 50 * (1 + np.tanh((k / 100) * (values - 50)) / np.tanh(0.5 * k))
    elif k == 0:
        return values  # Identity function when k=0

def datagenBigWig(cell_line, chr, minp, maxp, resolution, alld, dtscale, saveQ, info, sigscale=0):
    file_path = f'data/bigwig_files/{cell_line}.bigWig'
    bw = pybigtools.open(open(file_path, 'rb'))  # Keep the original file opening method as requested
    time_data_all = bw.values(f'chr{chr}')

    global time_data
    
    if not alld:
        time_data_all = bw.values(f'chr{chr}', minp * resolution, maxp * resolution)
    
    # Sample equally spaced values from `time_data_all` with the given resolution
    time_data = np.array(time_data_all[::resolution])
    

    # Identify invalid positions
    invalid_positions = np.where(np.isnan(time_data) | (time_data <= 0))[0]
    
    # Filter the time_data
    time_data = np.nan_to_num(time_data, nan=0.0001)  # Map 'nan' to 0.0001
    time_data[time_data <= 0] = 0.0001  # Map values less than or equal to 0 to 0.0001
    time_data[time_data > 100] = 100  # Cap values greater than 100 to 100
    time_data = np.array([100 - i for i in time_data]) # Data is given in inversed scale
    # Optional: Apply sigmoid transformation (0 for no transform)
    #time_data = sigmoid(time_data, sigscale) # Use k = (0,2,5,10,50)
    # Scaling
    time_data = dtscale * time_data
    interval_min = 30 # The start of the S-phase is often detected within a range of 0 to 30 minutes into the S-phase.
    interval_max = max(time_data)
    time_data = (time_data - np.min(time_data) )/ (np.max(time_data) - np.min(time_data)) * (interval_max - interval_min) + interval_min
    time_data[invalid_positions] = max(time_data)

    if saveQ:
        np.savetxt(f"data/whole-genome_timing_data/time_data_{info}.txt", time_data, fmt='%.30f')
        np.savetxt(f"data/whole-genome_missing_data/missing_data_{info}.txt", invalid_positions, fmt='%d')

    return time_data

In [33]:
### High-resolution data ###
# From: https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE137764

def logistic(x, L, k, x0):
    """Logistic function used for curve fitting."""
    return L / (1 + np.exp(-k * (x - x0)))

def calculate_medians(rtil2):
    """Calculates the median time points for each list of observations."""
    medians = []

    for data in rtil2:
        if np.sum(data) == 0:
            medians.append(np.nan)  # Handle the case of all zeros or no data
            continue

        # Accumulate the data points
        accumulated_data = np.cumsum(data)

        # Normalize the accumulated data to have a final value of 1
        normalized_data = accumulated_data / accumulated_data[-1]

        # Time points evenly distributed over 10 hours
        time_points = np.linspace(0, 10, len(data))

        # Fit the logistic function to the normalized accumulated data
        try:
            params, _ = curve_fit(logistic, time_points, normalized_data, p0=[1, 1, 5])
            L, k, x0 = params
            medians.append(x0)  # Append the median time point
        except RuntimeError:
            medians.append(np.nan)  # Append NaN if the fit fails

    return medians

def datagenHighRes(cell_line, chr, minp, maxp, resolution, alld, dtscale, saveQ, info):

    global time_data
    
    # Lengths of each chromosome in kilobases
    chr_lengths = [249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365, 141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355, 81196, 78078, 59129, 63026, 48130, 51305]

    # Path to the data file
    matfile = f'data/high_res_files/GSE137764_{cell_line}_GaussiansGSE137764_mooth_scaled_autosome.mat'
    
    # Read the data
    data = pd.read_csv(matfile, delimiter="\t", low_memory=False)
    
    # Extract relevant columns for the chromosome
    selected_columns = [col for col in data.columns if (f'chr{str(chr)}' == col or f'chr{str(chr)}.' in col)]
    
    rtil1 = []
    for col in selected_columns:
        lcol = np.array(data[col][2:18])
        lcol[np.isnan(lcol)] = 0.  # Ensures no NaNs are processed
        rtil1.append(lcol)

    rtil2 = calculate_medians(rtil1)  # Assume this function calculates some form of median or summarization
    
    # Calculate repeat factor and ensure the length is exactly the chromosome length
    original_length = len(rtil2)
    repeat_factor = chr_lengths[chr - 1] // original_length + (chr_lengths[chr - 1] % original_length > 0)
    
    # Create the repeated array, then slice to the exact chromosome length
    extended_data = np.repeat(rtil2, repeat_factor)[:chr_lengths[chr - 1]]
    
    # Apply Gaussian smoothing
    sigma = 20  # Standard deviation for Gaussian smoothing
    time_data = gaussian_filter(extended_data, sigma=sigma)

    invalid_positions = np.where(np.isnan(time_data) | (time_data <= 0))[0]
    
    # Ensure there are no NaN values in the final output
    time_data = np.nan_to_num(time_data)
    time_data = 60 * time_data
    time_data[invalid_positions] = max(time_data)

    if saveQ:
        np.savetxt(f"data/whole-genome_timing_data/time_data_{info}.txt", time_data, fmt='%.30f')
        np.savetxt(f"data/whole-genome_missing_data/missing_data_{info}.txt", invalid_positions, fmt='%d')
    
    return time_data

In [34]:
### Simple data generation () ###
# To be used in fitting
def datagenfs(cell_line, chr_number, chrpos_min, chrpos_max, resolution, alld, dtscale, saveQ, info, sigscale=0):
    if alld:
        time_data = np.loadtxt(f'data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}].txt', dtype=float)
    else:
        time_data = np.loadtxt(f'data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}].txt', dtype=float)[chrpos_min:chrpos_max]
        np.savetxt(f"data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt", time_data, fmt='%.30f')
    return time_data

#### GRO-Seq files ####

#### Fitting ####

In [35]:
def fitfunction(list, v0, st0, fit_step, maxiter, err_threshold, saveQ, info):
    
    timel = list
    
    v = v0
    st = st0
    exp_v = np.exp(-1/v)
    x00 = np.array([(math.pi/(4*v))*i**(-2) for i in timel])
    lm = 1000 # Remove end regions for error calculation
    
    # VECTORIZED APPROACH
    
    def mse(y_true, y_pred):
        mse_value = sum((yt - yp) ** 2 for yt, yp in zip(y_true, y_pred)) / len(y_true)
        return mse_value
    
    def fast_roll_add(dst, src, shift):
        dst[shift:] += src[:-shift]
        dst[:shift] += src[-shift:]
    
    # Expected replication time computation (replaces bcs)
    def fp(x, L, v):
        n = len(x)
        y = np.zeros(n)
    
        last_exp_2_raw = np.zeros(n)
        last_exp_2 = np.ones(n)
        unitary = x.copy()
        for k in range(L+1):
            if k != 0:
                fast_roll_add(unitary, x, k)
                fast_roll_add(unitary, x, -k)
            exp_1_raw = last_exp_2_raw
            exp_1 = last_exp_2
            exp_2_raw = exp_1_raw + unitary / v
            exp_2 = np.exp(-exp_2_raw)
    
            # Compute the weighted sum for each j and add to the total
            y += (exp_1 - exp_2) / unitary
            
            last_exp_2_raw = exp_2_raw
            last_exp_2 = exp_2
        return y

    # Fitting iteration
    def fitf(time, lst, x0, j, fit_step):
        return x0[j] * (lst[j] / time[j])**(fit_step)

    # Alternative fitting
    def fitf0(time, lst, x0, j, fit_step):
        return x0[j]**(np.log(time[j]) / np.log(lst[j]))

    # Fitting control
    def cfit(time, lst, x0, fit_step):
        result = np.empty_like(x0)
        for j in range(len(x0)):
            fit_result = fitf(time, lst, x0, j, fit_step)
            if fit_result < 10**(-err_threshold):
                result[j] = 10**(-err_threshold)
            #elif abs(time[j] - lst[j]) < .5:
            #    result[j] = x0[j]
            else:
                result[j] = fit_result
        return result
    
    xs = x00
    ys = fp(xs, len(xs)//st, v)
    err = 10**10
    
    for j in range(maxiter):
        xs0 = xs
        ys0 = ys
        xs = cfit(timel, ys, xs, fit_step)
        ys = fp(xs, len(xs)//st, v)
        
        new_err = mse(timel[lm:-lm], ys[lm:-lm])
        print(str(j+1) + '/' + str(maxiter) + ' err: ' + str('{:.30f}'.format(new_err)), end="\r")
        
        err = new_err  # Update the error with the new calculated error

    fire_rates = ['{:.30f}'.format(i) for i in xs]
    time_sim = ys
    
    if saveQ:
        with open(r'data/whole-genome_firing_rates/fire_rates_'+info+'.txt', 'w') as f:
            for rate in fire_rates:
                f.write(rate + '\n')
        np.savetxt(r'data/whole-genome_timing_simulation/time_sim_'+info+'.txt', time_sim, fmt='%.30f')
    
    return [fire_rates, time_sim]

#### Error generation ####

In [36]:
def compute_squared_error(time_data, time_simulation):
    return (time_data - time_simulation) ** 2

def process_files_and_compute_squared_error(cell_lines, chr_numbers, base_path):
    for cell_line in cell_lines:
        for chr_number in chr_numbers:
            # Define file paths
            time_data_file = os.path.join(base_path, f'whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}].txt')
            time_simulation_file = os.path.join(base_path, f'whole-genome_timing_simulation/time_sim_{cell_line}_chr[{chr_number}].txt')
            error_file = os.path.join(base_path, f'whole-genome_error/error_{cell_line}_chr[{chr_number}].txt')

            # Load data
            time_data = np.loadtxt(time_data_file, dtype=float)
            time_simulation = np.loadtxt(time_simulation_file, dtype=float)

            # Compute squared error
            squared_error = compute_squared_error(time_data, time_simulation)
            
            # Save squared error to file
            np.savetxt(error_file, squared_error, fmt='%.30f')

#### bedgraph file generation ####

In [None]:
# Dictionary of chromosome sizes for hg18
def chr_size_fun(genome_build):
    if genome_build == 'hg18':
        return {
            '1': 247249719, '2': 242951149, '3': 199501827, '4': 191273063, '5': 180857866, 
            '6': 170899992, '7': 158821424, '8': 146274826, '9': 140273252, '10': 135374737, 
            '11': 134452384, '12': 132349534, '13': 114142980, '14': 106368585, '15': 100338915, 
            '16': 88827254, '17': 78774742, '18': 76117153, '19': 63811651, '20': 62435964, 
            '21': 46944323, '22': 49691432, 'X': 154913754, 'Y': 57772954
        }
    elif genome_build == 'hg19':
        return {
            '1': 249250621, '2': 243199373, '3': 198022430, '4': 191154276, '5': 180915260,
            '6': 171115067, '7': 159138663, '8': 146364022, '9': 141213431, '10': 135534747,
            '11': 135006516, '12': 133851895, '13': 115169878, '14': 107349540, '15': 102531392,
            '16': 90354753, '17': 81195210, '18': 78077248, '19': 59128983, '20': 63025520,
            '21': 48129895, '22': 51304566, 'X': 155270560, 'Y': 59373566
        }
    elif genome_build == 'hg38':
        return {
            '1': 248956422, '2': 242193529, '3': 198295559, '4': 190214555, '5': 181538259,
            '6': 170805979, '7': 159345973, '8': 145138636, '9': 138394717, '10': 133797422,
            '11': 135086622, '12': 133275309, '13': 114364328, '14': 107043718, '15': 101991189,
            '16': 90338345, '17': 83257441, '18': 80373285, '19': 58617616, '20': 64444167,
            '21': 46709983, '22': 50818468, 'X': 156040895, 'Y': 57227415
        }

def txt_to_bedgraph(cell_line, data_type='error', genome_build='hg19'):
    # Define the output file name
    if data_type=='error':
        output_file = f'data/whole-genome_error/bedgraph_files/error_{cell_line}.bedgraph'
    elif data_type=='fire_rates':
        output_file = f'data/whole-genome_firing_rates/bedgraph_files/fire_rates_{cell_line}.bedgraph'
    
    # Open the output file in write mode
    with open(output_file, 'w') as bedgraph:
        # Write the header line
        bedgraph.write('track type=bedGraph\n')
        
        # Iterate through all the txt files for the specified cell line
        for chr_number in list(map(str, range(1, 23))) + ['X', 'Y']:
            if data_type=='error':
                input_file = f'data/whole-genome_error/error_{cell_line}_chr[{chr_number}].txt'
            elif data_type=='fire_rates':
                input_file = f'data/whole-genome_firing_rates/fire_rates_{cell_line}_chr[{chr_number}].txt'
            chr_size = chr_size_fun(genome_build)[chr_number]
            
            # Check if the input file exists
            if os.path.isfile(input_file):
                with open(input_file, 'r') as infile:
                    # Read through each line and write the corresponding bedgraph entry
                    for position, value in enumerate(infile):
                        start = position * 1000
                        end = min(start + 1000, chr_size)
                        if start >= chr_size:
                            break
                        value = value.strip()
                        bedgraph.write(f'chr{chr_number}\t{start}\t{end}\t{value}\n')
            else:
                None
                
    print(f'BEDGRAPH file created: {output_file}', end="\r")

#### BCS file generation ####

In [37]:
def bcs_gen(cell_line, chr_number, chrpos_min, chrpos_max, fork_speed, fire_rates, resolution):

    file_name = cell_line+'_chr['+str(chr_number)+']_'+str(chrpos_min)+'-'+str(chrpos_max)
    
    bcfile = 'code/DNAReplication.bc'
    new_bcfile = f'code/bcs_scripts/DNAReplication_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.bc'
    bcsfile = []
    
    chrLength = chrpos_max - chrpos_min
    orign = int(chrLength * resolution / 1000)
    fast = 100000
    x = np.linspace(chrpos_min, chrpos_max, chrLength)  # Chromosome positions
    
    with open(bcfile, 'r') as file:
        bcsfile = file.readlines()
    bcsfile[bcsfile.index("// Chromosome length\n")+1] = "L = "+str(chrLength)+";\n"
    bcsfile[bcsfile.index("// Fast rate\n")+1] = "fast = "+str(fast)+";\n"
    bcsfile[bcsfile.index("// Fork velocity\n")+1] = "v = "+str(fork_speed)+";\n"
    
    oril = list(map(floor, np.linspace(1, chrLength, num=orign)))
    
    flistn = fire_rates
    
    # write new origins
    oriarr = np.array([
        'ORI[' + str(floor(oril[i1])) + ',' + '{:.30f}'.format(flistn[i1]) + ']'
        for i1 in range(0, orign)
    ])
    
    # delete all the origins
    with open(new_bcfile, 'w') as fp:
        for number, line in enumerate(bcsfile):
            if number not in range(bcsfile.index("// PROCESS INITIATION\n")+1, bcsfile.index("// END")-2):
                fp.write(line)
        
    # now change the last line
    with open(new_bcfile, 'r') as file:
        bcsfile = file.readlines()
        bcsfile[bcsfile.index("// PROCESS INITIATION\n")+1] = str(oriarr).replace('"','').replace("'",'').replace(" "," || ")[1:-1]+';\n'
    
    with open(new_bcfile, 'w') as file:
        file.writelines(bcsfile)

#### BCS simulation output ####

##### Replication timing, fork directionality and origins #####

In [38]:
def process_bcs_output(cell_line, chr_number, chrpos_min, chrpos_max, fork_speed, resolution, scale_factor, sim_number, compute_replication_time, compute_fork_directionality, compute_origin_positions):
    # Define the file path
    file_path = f'data/bcs_output/bcs_output_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.simulation.bcs'

    # Initialize arrays to store replication time and fork directionality
    DNA_replicationtime = [0.0 for _ in range(0, chrpos_max - chrpos_min)] if compute_replication_time else None
    DNA_forkdirectionality = [0.0 for _ in range(0, chrpos_max - chrpos_min)] if compute_fork_directionality else None
    DNA_originpositions = [] if compute_origin_positions else None  # List to store origin positions per simulation
    current_origins = []
    sim_iteration = 0

    with open(file_path) as f:
        for line in f:
            if sim_iteration == sim_number + 1:
                break
            if line[0] == '>':
                alreadyDone = []
                if compute_origin_positions and current_origins:  # If we have collected origins for the current simulation
                    DNA_originpositions.append(current_origins)
                current_origins = []
                print(sim_iteration, end="\r")
                sim_iteration += 1
                continue
            splitLine = line.split('\t')
            if compute_origin_positions and splitLine[2] == "ORI":
                origin_pos = int(splitLine[4])
                current_origins.append(origin_pos)
            if splitLine[2] == "FL":
                pos = int(splitLine[4]) - 1
                time = float(splitLine[0])
                if pos not in alreadyDone:
                    if compute_replication_time:
                        DNA_replicationtime[pos] += time
                    if compute_fork_directionality:
                        DNA_forkdirectionality[pos] -= 1  # Track left-moving forks
                    alreadyDone.append(pos)
            if splitLine[2] == "FR":
                pos = int(splitLine[4]) - 1
                time = float(splitLine[0])
                if pos not in alreadyDone:
                    if compute_replication_time:
                        DNA_replicationtime[pos] += time
                    if compute_fork_directionality:
                        DNA_forkdirectionality[pos] += 1  # Track right-moving forks
                    alreadyDone.append(pos)

    # Don't forget to add the origins of the last simulation
    if compute_origin_positions and current_origins:
        DNA_originpositions.append(current_origins)

    # Average the results over the number of simulations
    if compute_replication_time:
        for i in range(len(DNA_replicationtime)):
            DNA_replicationtime[i] = float(DNA_replicationtime[i]) / float(sim_number)

    if compute_fork_directionality:
        for i in range(len(DNA_forkdirectionality)):
            DNA_forkdirectionality[i] = float(DNA_forkdirectionality[i]) / float(sim_number)

    # Define file paths for saving the results
    base_path = 'data'
    replication_time_path = os.path.join(base_path, 'whole-genome_timing_bcs', f'time_bcs_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt')
    fork_directionality_path = os.path.join(base_path, 'whole-genome_fork_directionality', f'fork_directionality_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt')
    origin_positions_path = os.path.join(base_path, 'whole-genome_origins', f'origin_positions_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt')

    # Create directories if they do not exist
    os.makedirs(os.path.dirname(replication_time_path), exist_ok=True)
    os.makedirs(os.path.dirname(fork_directionality_path), exist_ok=True)
    os.makedirs(os.path.dirname(origin_positions_path), exist_ok=True)

    # Save the results to text files
    if compute_replication_time:
        np.savetxt(replication_time_path, DNA_replicationtime, fmt='%.6f')

    if compute_fork_directionality:
        np.savetxt(fork_directionality_path, DNA_forkdirectionality, fmt='%.6f')

    if compute_origin_positions:
        with open(origin_positions_path, 'w') as f:
            for origins in DNA_originpositions:
                f.write(' '.join(map(str, origins)) + '\n')

In [None]:
def process_intervals(cell_lines, chr_numbers, fork_speed=1.4, resolution=1000, scale_factor=6, sim_number=5, compute_replication_time=True, compute_fork_directionality=True, compute_origin_positions=True, interval=None):
    for cell_line in cell_lines:
        for chr_number in chr_numbers:
            chr_length = chr_lengths[chr_number - 1]  # Get the length of the chromosome
            intervals = [(interval[0], interval[1])] if interval else [(start, min(start + 10000, chr_length)) for start in range(0, chr_length, 10000)]
            for start, end in intervals:
                process_bcs_output(
                    cell_line=cell_line,
                    chr_number=chr_number,
                    chrpos_min=start,
                    chrpos_max=end,
                    fork_speed=fork_speed,
                    resolution=resolution,
                    scale_factor=scale_factor,
                    sim_number=sim_number,
                    compute_replication_time=compute_replication_time,
                    compute_fork_directionality=compute_fork_directionality,
                    compute_origin_positions=compute_origin_positions
                )

##### Interorigin distances #####

In [39]:
def compute_interorigin_distances(cell_line, chr_number, chrpos_min, chrpos_max):
    base_path = 'data'
    
    # Define file path for loading the origins
    origins_path = os.path.join(base_path, 'whole-genome_origins', f'origin_positions_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt')
    
    # Load origins data from text file
    if os.path.exists(origins_path):
        with open(origins_path, 'r') as f:
            origins_data = [list(map(int, line.strip().strip('[]').split())) for line in f]
    else:
        raise FileNotFoundError(f"Origins data not found at {origins_path}")
    
    # Compute interorigin distances for each simulation
    interorigin_distances = []
    for origins in origins_data:
        origins_sorted = sorted(origins)
        distances = np.diff(origins_sorted)
        interorigin_distances.append(distances)
    
    # Define file path for saving the interorigin distances
    iod_path = os.path.join(base_path, 'whole-genome_interorigin_distances', f'iod_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt')
    
    # Save interorigin distances to a text file
    with open(iod_path, 'w') as f:
        for distances in interorigin_distances:
            f.write(f"{list(distances)}\n")

def compute_interorigin_intervals(cell_lines, chr_numbers, interval=None):
    for cell_line in cell_lines:
        for chr_number in chr_numbers:
            chr_length = chr_lengths[chr_number - 1]  # Get the length of the chromosome
            intervals = [(interval[0], interval[1])] if interval else [(start, min(start + 10000, chr_length)) for start in range(0, chr_length, 10000)]
            for start, end in intervals:
                compute_interorigin_distances(
                    cell_line=cell_line,
                    chr_number=chr_number,
                    chrpos_min=start,
                    chrpos_max=end
                )

def average_iod_data(cell_lines, chr_numbers, factor_min=5, show_per_cell_line=False):
    all_iod_data = []
    iod_data_per_cell_line = []

    for cell_line in cell_lines:
        cell_line_iod_data = []
        for chr_number in chr_numbers:
            chr_length = chr_lengths[chr_number - 1]
            for start in range(0, chr_length, 10000):
                end = min(start + 10000, chr_length)
                iod_data = load_function_metrics(cell_line, chr_number, "iod", start, end, factor_min=factor_min)
                cell_line_iod_data.extend(iod_data)
        iod_data_per_cell_line.append(cell_line_iod_data)
        all_iod_data.extend(cell_line_iod_data)

    if show_per_cell_line:
        return iod_data_per_cell_line
    else:
        return [all_iod_data]

#### File joining functions ####

In [40]:
def join_files(cell_line, chr_number, datatype, interval=10000):
    all_data = []
    max_length = chr_lengths[chr_number - 1]
    
    for start in range(0, max_length, interval):
        end = min(start + interval, max_length)  # Ensure the end does not exceed max_length
        file_name = f'data/whole-genome_{datatype}/fork_directionality_{cell_line}_chr[{chr_number}]_{start}-{end}.txt'
        
        if os.path.exists(file_name):
            data = np.loadtxt(file_name, dtype=float)
            all_data.append(data)
        else:
            print(f'Warning: {file_name} does not exist and will be skipped.')
    
    # Concatenate all data into a single array
    if all_data:
        concatenated_data = np.concatenate(all_data)
    else:
        concatenated_data = np.array([])

    # Save concatenated data to a new file
    output_file = f'data/whole-genome_{datatype}/fork_directionality_{cell_line}_chr[{chr_number}].txt'
    np.savetxt(output_file, concatenated_data)

#### Loading functions ####

In [88]:
def load_function(cell_line, chr_number, load_type, replace_missingQ=True):
    if load_type == 'time_data':
        file_path = f'data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}].txt'
    elif load_type == 'time_sim':
        file_path = f'data/whole-genome_timing_simulation/time_sim_{cell_line}_chr[{chr_number}].txt'
    elif load_type == 'error':
        file_path = f'data/whole-genome_error/error_{cell_line}_chr[{chr_number}].txt'
    elif load_type == 'fire_rates':
        file_path = f'data/whole-genome_firing_rates/fire_rates_{cell_line}_chr[{chr_number}].txt'
    elif load_type == 'forkd':
        file_path = f'data/whole-genome_fork_directionality/fork_directionality_{cell_line}_chr[{chr_number}].txt'
        
    if not os.path.exists(file_path):
        return np.array([], dtype=int)
    data = np.loadtxt(file_path, dtype=float)
    if replace_missingQ:
        missing_data_path = f'data/whole-genome_missing_data/missing_data_{cell_line}_chr[{chr_number}].txt'
        if os.path.getsize(missing_data_path) > 0:
            missing_positions = np.loadtxt(missing_data_path, dtype=int)
        else:
            missing_positions = np.array([], dtype=int) 
        data[missing_positions] = np.nan
    return data

def load_function_pos(chr_number, load_type, cell_line=None, site_letter='A', base='A'):
    if load_type == 'centromeres':
        file_path = f'data/genome_regions/centromeres/positions_centromeres_chr[{chr_number}].txt'
    elif load_type == 'telomeres':
        file_path = f'data/genome_regions/telomeres/positions_telomeres_chr[{chr_number}].txt'
    elif load_type == 'fragile_sites':
        file_path = f'data/genome_regions/fragile_sites/positions_fragile_site_{chr_number}{site_letter}.txt'
    elif load_type == 'bases':
        file_path = f'data/genome_regions/bases/positions_{base}_chr[{chr_number}].txt'
        
    if not os.path.exists(file_path):
        return np.array([], dtype=int)
    data = np.loadtxt(file_path, dtype=int)
    return data

def load_function_metrics(cell_line, chr_number, load_type, chrpos_min, chrpos_max, factor_min=5, replace_missingQ=True):
    if load_type == "iod":
        file_path = f'data/whole-genome_interorigin_distances/iod_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt'
        iod_data = []
        with open(file_path, 'r') as file:
            for line in file:
                iod_values = list(map(float, line.strip().strip('[]').split(',')))
                iod_data.extend([iod for iod in iod_values if iod >= factor_min])  # Filter IOD values
        data = np.array(iod_data)
        return data

def load_missing_data(cell_line, chr_number):
    file_path = f'data/whole-genome_missing_data/missing_data_{cell_line}_chr[{chr_number}].txt'
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
        data = np.loadtxt(file_path, dtype=int)
    else:
        data = np.array([], dtype=int)
    return data

#### Genome regions ####

In [1]:
def generate_all_data(cell_line, chr_numbers, load_type1, load_type2):
    all_data1 = []
    all_data2 = []
    for chr_number in chr_numbers:
        data1 = load_function(cell_line, chr_number, load_type1)
        data2 = load_function(cell_line, chr_number, load_type2)
        all_data1.extend(data1)
        all_data2.extend(data2)
    return [all_data1, all_data2]

In [42]:
# Centromeres and telomeres

def gen_positions_centromere_telomeres(chr_number):
    # Telomere positions (start and end 500 kb)
    telomere_start = 500  # in kb
    telomere_end_offset = 500  # in kb

    # Centromere positions (in kb, hg38, approximate)
    centromere_positions_hg38 = [
        (121535, 124535), (92326, 95326), (90505, 93505), (49660, 52660),
        (46406, 49406), (58830, 61830), (58054, 61054), (43839, 46839),
        (47368, 50368), (39255, 42255), (51644, 54644), (34857, 37857),
        (16000, 19000), (16000, 19000), (17000, 20000), (35336, 38336),
        (22263, 25263), (15461, 18461), (24682, 27682), (26370, 29370),
        (11288, 14288), (13000, 16000)
    ]

    # Provided chromosome lengths (in kb)
    chromosome_lengths = [
        249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365,
        141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355,
        81196, 78078, 59129, 63026, 48130, 51305
    ]

    # Define additional positions to include telomeres and centromeres
    length = chromosome_lengths[chr_number - 1]
    centromere_start, centromere_end = centromere_positions_hg38[chr_number - 1]
    positions_telomeres = np.concatenate([
        np.arange(0, telomere_start),
        np.arange(length - telomere_end_offset, length)
    ])
    positions_centromeres = np.arange(centromere_start, centromere_end)

    # Save telomere_positions to a text file
    np.savetxt(f'data/genome_regions/telomeres/positions_telomeres_chr[{chr_number}].txt', positions_telomeres, fmt='%d')

    # Save centromere_positions to a text file
    np.savetxt(f'data/genome_regions/centromeres/positions_centromeres_chr[{chr_number}].txt', positions_centromeres, fmt='%d')

In [43]:
# Fragile sites

def gen_positions_fragile_sites(chr_number, site_letter):
    # Load the CSV file
    csv_path = 'data/fragile_sites/humCFS-fragile_sites.csv'
    df = pd.read_csv(csv_path, header=None)
    
    positions_fragile_site = []

    # Find the column corresponding to the given chromosome
    col_index = chr_number - 1  # Chromosome 1 corresponds to column 0, and so on

    if col_index >= df.shape[1]:
        print(f"Warning: Chromosome {chr_number} not found in the CSV file.")
        return

    # Find the row corresponding to the given site letter
    row_index = ord(site_letter.upper()) - ord('A')  # 'A' corresponds to row 0, 'B' to row 1, and so on

    if row_index >= df.shape[0]:
        #print(f"Warning: Site letter {site_letter} not found in the CSV file for chromosome {chr_number}.")
        return

    # Extract the range in the form chrposmin-chrposmax
    site_range = df.iloc[row_index, col_index]

    if pd.isna(site_range):
        #print(f"Warning: No data for site {site_letter} on chromosome {chr_number}.")
        return

    # Split the range into minimum and maximum positions
    pos_min, pos_max = map(int, site_range.split('-'))

    # Convert positions to kb
    pos_min_kb = pos_min // 1000
    pos_max_kb = pos_max // 1000

    # Append the range as a numpy array
    positions_fragile_site.append(np.arange(pos_min_kb, pos_max_kb))

    # Flatten the list of arrays
    flattened_positions = np.concatenate(positions_fragile_site)

    # Save positions_fragile_site to a text file, with each value on a new line
    np.savetxt(f'data/genome_regions/fragile_sites/positions_fragile_site_{chr_number}{site_letter}.txt', flattened_positions, fmt='%d')

In [None]:
# Base regions

def gen_positions_bases(local_genome_file, chr_lengths):
    bases = ['A', 'T', 'G', 'C']
    
    output_dir = 'data/genome_regions/bases'
    os.makedirs(output_dir, exist_ok=True)
    
    with gzip.open(local_genome_file, "rt") as handle:
        for record in SeqIO.parse(handle, "fasta"):
            chr_number = record.id.lstrip("chr")  # Removing 'chr' prefix
            if chr_number.isdigit():
                chr_number = int(chr_number)
                if chr_number in range(1, 23):  # Assuming we only want chromosomes 1-22
                    seq = str(record.seq).upper()  # Ensure sequence is uppercase
                    base_files = {base: [] for base in bases}
                    
                    for kb in range(chr_lengths[chr_number-1]):
                        position = kb * 1000  # 0-based position
                        if position < len(seq):
                            base_pair = seq[position]
                            if base_pair in base_files:
                                base_files[base_pair].append(kb)
                    
                    for base, locations in base_files.items():
                        with open(os.path.join(output_dir, f'positions_{base}_chr[{chr_number}].txt'), 'w') as file:
                            for loc in locations:
                                file.write(f'{loc}\n')

#### Replication timing plots

In [45]:
def rt_plotf(cell_line, chr_number, chrpos_min, chrpos_max, scale_factor, file_name, spec_fileQ, saveQ, ax=None, show_ticks=True, show_title=True, simQ=False):
    global time_data

    # Data loading (Warning: requires saving data in fitting procedure)
    # Choose between whole-genome files or particular simulation
    if spec_fileQ:
        time_data = np.loadtxt(f'data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt', dtype=float)
        time_sim = np.loadtxt(f'data/whole-genome_timing_simulation/time_sim_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt', dtype=float)
    else:
        time_data = np.loadtxt(f'data/whole-genome_timing_data/time_data_{cell_line}_chr[{chr_number}].txt', dtype=float)[chrpos_min:chrpos_max]
        time_sim = np.loadtxt(f'data/whole-genome_timing_simulation/time_sim_{cell_line}_chr[{chr_number}].txt', dtype=float)[chrpos_min:chrpos_max]

    if simQ:
        time_sim = np.loadtxt(f'data/whole-genome_timing_bcs/time_bcs_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt', dtype=float)
    
    x = np.linspace(chrpos_min, chrpos_max, chrpos_max - chrpos_min)  # Chromosome positions

    
    # Plotting
    if ax is None:
        plt.figure(figsize=(10, 6))
        ax = plt.gca()
        
    ax.plot(x, time_data, label='data', color='gray', linewidth=4, alpha=0.6)
    ax.plot(x, time_sim, label='bcs', color='red', linewidth=4, alpha=0.6)
    if show_title:
        ax.set_title(cell_line + ' - Chromosome ' + str(chr_number))
    ax.set_xlabel('Chromosome position (kb)' if show_title else None)
    ax.set_ylabel('Time in S-phase (min)' if show_title else "Replication time")
    ax.set_ylim(100 * scale_factor, 0)
    ax.set_xlim(chrpos_min, chrpos_max)  # Ensure the x-axis covers the full range
    ax.legend(loc='lower right')
    ax.grid(True)
    ax.grid(False)
    ax.tick_params(axis='both', which='both', direction='out', bottom=show_ticks, labelbottom=show_ticks, left=show_ticks, labelleft=show_ticks)
    for spine in ax.spines.values():
        spine.set_visible(True)
    
    # Save plot
    if saveQ:
        plt.savefig('figures/plot_RT_' + file_name + '.pdf', bbox_inches='tight', transparent=True)

    if ax is None:
        plt.show()

#### KDE plots

In [80]:
def plot_relative_kdes(data_list, labels, bw_adjust=1, saveQ=False, x_grid_size=1000, normalize=False, log_scale=False, x_min=None, x_max=None, plot_title="Relative density plots", x_title="Error", save_name="savedfile"):
    if log_scale:
        # Filter out non-positive values for log scale
        data_list = [data[data > 0] for data in data_list]
        if x_min is None:
            x_min = min(data.min() for data in data_list)
        if x_max is None:
            x_max = max(data.max() for data in data_list)
        x = np.logspace(np.log10(x_min), np.log10(x_max), x_grid_size)
    else:
        if x_min is None:
            x_min = min(data.min() for data in data_list)
        if x_max is None:
            x_max = max(data.max() for data in data_list)
        x = np.linspace(x_min, x_max, x_grid_size)

    plt.figure(figsize=(6, 6))

    # Compute and plot the KDEs
    if normalize:
        for data in data_list:
            ax = sns.kdeplot(data, fill=True, bw_adjust=bw_adjust, log_scale=log_scale)#, alpha=.5)
    else:
        ax = sns.kdeplot(data_list, fill=True, bw_adjust=bw_adjust, log_scale=log_scale)#, alpha=.5)
    handles = [mpatches.Patch(facecolor=color, label=label, alpha=0.5) for color, label in zip(plt.rcParams['axes.prop_cycle'].by_key()['color'], labels)]


    
    plt.title(plot_title)
    plt.xlabel(x_title)
    
    plt.gca().yaxis.set_visible(False)  # Remove y-axis ticks
    plt.ylabel('')  # Remove y-axis label
    if log_scale:
        plt.xscale('log')
    
    plt.xlim(x_min, x_max)
    plt.legend(handles=handles)
    if saveQ:
        plt.savefig(f'figures/fig_kdeplot_{save_name}.svg', bbox_inches='tight', transparent=True)
    plt.show()

#### Data vs data scatter plots ####

In [47]:
def create_modified_cmap(map_to_white):
    viridis = plt.cm.viridis
    newcolors = viridis(np.linspace(0, 1, 256))
    if map_to_white:
        white = np.array([1, 1, 1, 1])
        newcolors[:1, :] = white
    modified_cmap = mcolors.ListedColormap(newcolors)
    return modified_cmap

def plot_replication_data_vs_data(data, labels, colors, xmin=1e-15, xmax=1e5, ymin=0, ymax=500, title_x="Error", title_y="Replication time (min)",
                                  log_x=True, log_y=False, use_density=True, dpi=100, map_to_white=False, saveQ=False):
    
    fig, ax = plt.subplots(figsize=(10, 6))
    modified_cmap = create_modified_cmap(map_to_white)
    
    if use_density:
        for (data1, data2), label, color in zip(data, labels, colors):
            # Filter out NaN values
            mask = ~np.isnan(data2) & ~np.isnan(data1)
            data2 = np.array(data2)[mask]
            data1 = np.array(data1)[mask]

            density = ScatterDensityArtist(ax, data2, data1, cmap=modified_cmap, dpi=dpi)
            ax.add_artist(density)

        cbar = plt.colorbar(density, ax=ax)
        cbar.ax.set_ylabel('Density')
    else:
        for (data1, data2), label, color in zip(data, labels, colors):
            ax.scatter(data2, data1, s=0.02 if label == 'Whole-genome' else 0.1, color=color, label=label)
        # Custom legend
        handles, labels = ax.get_legend_handles_labels()
        new_handles = [plt.Line2D([], [], color=handle.get_facecolor()[0], marker='o', linestyle='', markersize=3) for handle in handles]
        ax.legend(handles=new_handles, labels=labels)

    # Set x-axis scale to log if log_x is True
    if log_x:
        ax.set_xscale('log')

    # Set y-axis scale to log if log_y is True
    if log_y:
        ax.set_yscale('log')
        ax.set_ylim((ymin, ymax))
    else:
        ax.set_ylim((ymin, ymax))
        ax.set_ylim(ax.get_ylim()[::-1])  # Invert y-axis only if not log scale

    ax.set_xlim((xmin, xmax))
    ax.set_aspect(aspect='auto')

    ax.set_title('')
    ax.set_xlabel(title_x)
    ax.set_ylabel(title_y)

    if saveQ:
        plt.savefig('figures/fig_scatter_density.pdf', bbox_inches='tight', transparent=True)
    
    plt.show()

In [48]:
def generate_plot(load_type1, load_type2, cell_line, chr_numbers, chr_numbers_tc, chr_numbers_cfs, site_letters,
                  show_whole_genome=True, show_telomeres=False, show_centromeres=False, 
                  show_fragile_sites=True, allcfsQ=False, xmin=1e-15, xmax=1e-3, ymin=0, ymax=500, log_x=True, log_y=False,
                  use_density=True, dpi=100, map_to_white=False, saveQ=False):

    # Lists to be mapped
    list1 = ["time_data", "time_sim", "error", "fire_rates", "forkd"]
    list2 = ["Replication time (min)", "Replication time (min)", "Error", "Firing rate", "Fork directionality"]
    
    # Create a dictionary using a dictionary comprehension
    title_map = {key: value for key, value in zip(list1, list2)}
    
    title_x = title_map[load_type2]
    title_y = title_map[load_type1]

    all_data = []
    labels = []
    colours = []

    if show_whole_genome:
        all_data.append(generate_all_data(cell_line, chr_numbers, load_type1, load_type2))
        labels.append("Whole-genome")
        colours.append('#1f77b4')

    if False:
        data_t, data_c = generate_telomeres_centromeres_data(cell_line, chr_numbers_tc, load_type1, load_type2)
        all_data.append(data_t)
        labels.append("Telomeres")
        colours.append('orange')

    if show_telomeres:
        positions_telomeres = np.loadtxt(f'data/genome_regions/telomeres/positions_telomeres_chr[{chr_number}].txt', dtype=int)
        all_data.append(positions_telomeres)
        labels.append("Telomeres")
        colours.append('orange')


    if show_centromeres:
        data_t, data_c = generate_telomeres_centromeres_data(cell_line, chr_numbers_tc, load_type1, load_type2)
        all_data.append(data_c)
        labels.append("Centromeres")
        colours.append('green')

    if show_fragile_sites:
        all_data_cfs = generate_fragile_sites_data(cell_line, chr_numbers_cfs, site_letters, load_type1, load_type2, allcfsQ)
        all_data.extend(all_data_cfs)
        labels.extend(["Fragile sites"] if allcfsQ else [f"FRA{chr_number}{site_letter}" for chr_number in chr_numbers_cfs for site_letter in site_letters])
        colours.extend(list(plt.cm.autumn(np.linspace(0, .5, len(labels) - len(colours)))))

    plot_replication_data_vs_data(all_data, labels, colours, xmin, xmax, ymin, ymax, title_x=title_x, title_y=title_y, log_x=log_x, log_y=log_y,
                                  use_density=use_density, dpi=dpi, map_to_white=map_to_white, saveQ=saveQ)

In [61]:
def generate_plot(load_type1, load_type2, cell_line, chr_numbers,
                  show_telomeres=False, show_centromeres=False, show_fragile_sites=False,
                  chr_number_fragile_sites=[1], site_letters=['A'],
                  xmin=1e-15, xmax=1e-3, ymin=0, ymax=500, log_x=True, log_y=False,
                  use_density=True, dpi=100, map_to_white=False, saveQ=False):

    global all_data

    # Lists to be mapped
    list1 = ["time_data", "time_sim", "error", "fire_rates", "forkd"]
    list2 = ["Replication time (min)", "Replication time (min)", "Error", "Firing rate", "Fork directionality"]
    
    # Create a dictionary using a dictionary comprehension
    title_map = {key: value for key, value in zip(list1, list2)}
    
    title_x = title_map[load_type2]
    title_y = title_map[load_type1]

    all_data = []
    labels = []
    colours = []

    all_data.append(generate_all_data(cell_line, chr_numbers, load_type1, load_type2))
    labels.append("Whole-genome")
    colours.append('#1f77b4')

    for chr_number in chr_numbers:

        if show_centromeres:
            positions_centromeres = load_function_pos(chr_number, "centromeres")
            data_centromeres = [[sublist[pos] for pos in positions_centromeres] for sublist in all_data[0]]
            all_data.append(data_centromeres)
            labels.append("" if "Centromeres" in labels else "Centromeres")
            colours.append('green')
    
        if show_telomeres:
            positions_telomeres = load_function_pos(chr_number, "telomeres")
            data_telomeres = [[sublist[pos] for pos in positions_telomeres] for sublist in all_data[0]]
            all_data.append(data_telomeres)
            labels.append("" if "Telomeres" in labels else "Telomeres")
            colours.append('orange')

    for chr_number in chr_number_fragile_sites:

        if show_fragile_sites:
            for site_letter in site_letters:
                positions_fragile_sites = load_function_pos(chr_number, "fragile_sites", site_letter=site_letter)
                data_fragile_sites = [[sublist[pos] for pos in positions_fragile_sites] for sublist in all_data[0]]
                all_data.append(data_fragile_sites)
                labels.append(f'FRA{chr_number}{site_letter}')
                colours.extend(list(plt.cm.autumn(np.linspace(0, .5, len(labels) - len(colours)))))

    plot_replication_data_vs_data(all_data, labels, colours, xmin, xmax, ymin, ymax, title_x=title_x, title_y=title_y, log_x=log_x, log_y=log_y,
                                  use_density=use_density, dpi=dpi, map_to_white=map_to_white, saveQ=saveQ)

#### Firing rate plots

In [50]:
def fire_plotf(cell_line, chr_numbers, resolution, file_name, saveQ, ax=None, aspect_ratio=(10, 6), replace_missing_with_nan=True, show_ticks=True, show_title=True):

    # Chromosome lengths in kb (1 kb resolution)
    chr_lengths = [249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365, 141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355, 81196, 78078, 59129, 63026, 48130, 51305]

    if ax is None:
        fig, axes = plt.subplots(len(chr_numbers), 1, figsize=aspect_ratio, sharey=True)
    else:
        axes = [ax]  # Ensure axes is a list even if it's just one subplot
    
    for idx, chr_number in enumerate(chr_numbers):
        # Data loading: Read firing rates from a text file
        firing_rates_file_path = f'data/whole-genome_firing_rates/fire_rates_{cell_line}_chr[{chr_number}].txt'
        firing_rates = np.loadtxt(firing_rates_file_path, dtype=float)

        if replace_missing_with_nan:
            missing_data_path = f'data/whole-genome_missing_data/missing_data_{cell_line}_chr[{chr_number}].txt'
            missing_positions = np.loadtxt(missing_data_path, dtype=int)
            firing_rates[missing_positions] = np.nan

        # Generate chromosome positions in Mb
        x = np.linspace(0, chr_lengths[chr_number - 1] / 1000, len(firing_rates))  # Chromosome positions in Mb

        # Plotting
        ax = axes[idx] if len(chr_numbers) > 1 else axes[0]
        ax.plot(x, firing_rates, color='#1f77b4', linewidth=2, alpha=.9)
        
        if idx == 0 and show_title:
            ax.set_title(f'{cell_line}')
        if idx == len(chr_numbers) - 1:
            if show_ticks:
                ax.set_xlabel('Chromosome position (Mb)')
                ax.set_xticks(np.arange(0, chr_lengths[0] / 1000, 20))  # Set x-axis ticks to show every 20 Mb
        else:
            ax.tick_params(axis='x', which='both', bottom=False, top=False, labelbottom=False)  # Remove x-axis ticks and labels for all but the last plot
        
        ax.set_yscale('log')
        ax.set_ylim(10**-12, 10**-3)
        ax.set_xlim(0, chr_lengths[0] / 1000)  # Set x-axis limit to the largest chromosome length in Mb
        ax.tick_params(axis='both', which='both', direction='out')
        for spine in ax.spines.values():
            spine.set_visible(True)
        
        # Add chromosome number on the top right corner with a transparent box
        ax.text(0.995, 0.93, f'chr {chr_number}', transform=ax.transAxes, 
                fontsize=12, verticalalignment='top', horizontalalignment='right',
                bbox=dict(facecolor='white', alpha=0., edgecolor='none'))
        
        ax.grid(False)  # Remove grid for each plot

    # Add a shared y-axis label if no axis is provided
    if ax is None:

        fig.text(0.065, 0.5, 'Firing rate', va='center', rotation='vertical')

        # Adjust layout
        plt.subplots_adjust(left=0.1, right=0.95, top=0.95, bottom=0.05, hspace=0.15)  # Adjust margins and spacing

    # Save plot if saveQ is True
    if saveQ:
        plt.savefig(f'figures/fig_plot_fire_signatures_{cell_line}.svg', bbox_inches='tight', transparent=True)

        plt.show()


In [51]:
def fire_plotf2(cell_line, chr_number, chrpos_min, chrpos_max, file_name, saveQ, ax=None, show_ticks=True, show_title=True):
    global fire_data

    # Data loading
    fire_data = np.loadtxt('data/whole-genome_firing_rates/fire_rates_' + cell_line + '_chr[' + str(chr_number) + '].txt', dtype=float)[chrpos_min:chrpos_max]
    x = np.linspace(chrpos_min, chrpos_max, chrpos_max - chrpos_min)  # Chromosome positions

    # Plotting
    if ax is None:
        plt.figure(figsize=(10, 6))
        ax = plt.gca()
        
    ax.plot(x, fire_data, color='#1f77b4', linewidth=2, alpha=.9)
    if show_title:
        ax.set_title(cell_line + ' - Chromosome ' + str(chr_number))
    ax.set_xlabel('Chromosome position (kb)' if show_title else None)
    ax.set_ylabel('Firing rate' if show_title else "Firing")
    ax.set_yscale('log')
    ax.set_ylim(10**-12, 10**-3)
    ax.set_xlim(chrpos_min, chrpos_max)  # Ensure the x-axis covers the full range
    ax.tick_params(axis='both', which='both', direction='out', bottom=show_ticks, labelbottom=show_ticks, left=show_ticks, labelleft=show_ticks)
    for spine in ax.spines.values():
        spine.set_visible(True)
        
    # Set logarithmic ticks for y-axis
    ax.yaxis.set_major_formatter(LogFormatterMathtext())
    ax.yaxis.set_minor_formatter(LogFormatterMathtext())

    # Save plot
    if saveQ:
        plt.savefig('figures/plot_fire_' + file_name + '.pdf', bbox_inches='tight', transparent=True)

    if ax is None:
        plt.show()

#### Error plots ####

In [52]:
def error_plotf(cell_line, chr_number, chrpos_min, chrpos_max, file_name, saveQ, ax=None, show_ticks=True, show_title=True):
    global error_data

    # Data loading
    error_data = np.loadtxt('data/whole-genome_error/error_' + cell_line + '_chr[' + str(chr_number) + '].txt', dtype=float)[chrpos_min:chrpos_max]
    x = np.linspace(chrpos_min, chrpos_max, chrpos_max - chrpos_min)  # Chromosome positions

    # Plotting
    if ax is None:
        plt.figure(figsize=(10, 6))
        ax = plt.gca()
        
    ax.plot(x, error_data, color='#ff7f0e', linewidth=2, alpha=.9)
    if show_title:
        ax.set_title(cell_line + ' - Chromosome ' + str(chr_number))
    ax.set_xlabel('Chromosome position (kb)' if show_title else None)
    ax.set_ylabel('Error' if show_title else "Error")
    ax.set_yscale('log')
    ax.set_ylim(10**-12, 10**6)
    ax.set_xlim(chrpos_min, chrpos_max)  # Ensure the x-axis covers the full range
    ax.tick_params(axis='both', which='both', direction='out', bottom=show_ticks, labelbottom=show_ticks, left=show_ticks, labelleft=show_ticks)
    for spine in ax.spines.values():
        spine.set_visible(True)
        
    # Set logarithmic ticks for y-axis
    ax.yaxis.set_major_formatter(LogFormatterMathtext())
    ax.yaxis.set_minor_formatter(LogFormatterMathtext())

    # Save plot
    if saveQ:
        plt.savefig('figures/plot_error_' + file_name + '.pdf', bbox_inches='tight', transparent=True)

    if ax is None:
        plt.show()

In [53]:
def plot_goodness_of_fit(chr_number, saveQ, ax=None, cell_lines=None, chrpos_min=0, chrpos_max=10, alld=True, base_path='data/', shift_param=1.0):
    global spaced_data, missing_data
    
    if cell_lines is None:
        cell_lines_BigWig = ["HeLa-S3","BJ1","IMR90","HUVEC","K562","GM12878","HepG2","MCF-7"]
        cell_lines_HighRes = ["H1","H9","HCT"]
        cell_lines = cell_lines_BigWig + cell_lines_HighRes

    chr_lengths = [249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365, 141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355, 81196, 78078, 59129, 63026, 48130, 51305]
    num_positions = chr_lengths[chr_number - 1]
    num_cell_lines = len(cell_lines)

    # Placeholder for goodness of fit data (use actual data from your model)
    goodness_of_fit = [load_function(cell_line, chr_number, 'error') for cell_line in cell_lines]

    # Apply shifted log transformation
    transformed_goodness_of_fit = [np.log1p(data + shift_param) for data in goodness_of_fit]

    # Normalize the transformed data to [0, 1] range
    min_val = np.nanmin(transformed_goodness_of_fit)
    max_val = np.nanmax(transformed_goodness_of_fit)
    normalized_goodness_of_fit = [(data - min_val) / (max_val - min_val) for data in transformed_goodness_of_fit]

    # Create a custom colormap for vivid red to vivid green
    cmap = plt.get_cmap('RdYlGn_r')  # Note the '_r' to reverse the colormap

    # Create a colormap that includes gray for the "no data" regions
    colors = cmap(np.linspace(0, 1, 256))
    colormap_gray = np.array([[0., 0., 0., 0.17]])  # RGBA for gray in colormap
    new_colors = np.vstack((colors, colormap_gray))
    extended_cmap = ListedColormap(new_colors)

    if not alld:
        normalized_goodness_of_fit = [i[chrpos_min:chrpos_max] for i in normalized_goodness_of_fit]
        num_positions = chrpos_max - chrpos_min
        x = np.linspace(chrpos_min, chrpos_max, chrpos_max - chrpos_min)  # Chromosome positions

    # Create the spaced data array with triplicates and NaN rows
    spaced_data = np.full((num_cell_lines * 4 - 1, num_positions), np.nan)
    for i in range(num_cell_lines):
        spaced_data[i * 4] = normalized_goodness_of_fit[i]
        spaced_data[i * 4 + 1] = normalized_goodness_of_fit[i]
        spaced_data[i * 4 + 2] = normalized_goodness_of_fit[i]

    # Load missing data and update goodness_of_fit
    for i, cell_line in enumerate(cell_lines):
        missing_data = load_missing_data(cell_line, chr_number) 
        if not alld:
            #missing_data = list(set(missing_data) & set(range(chrpos_min, chrpos_max)))
            missing_data = [i - chrpos_min for i in range(chrpos_min, chrpos_max) if i in missing_data]
        spaced_data[i * 4, missing_data] = 256  # Mark missing data positions with index for gray
        spaced_data[i * 4 + 1, missing_data] = 256
        spaced_data[i * 4 + 2, missing_data] = 256

    # Avoiding the maximum value being exactly 1 by subtracting a small epsilon value
    spaced_data0 = spaced_data
    spaced_data = np.clip(spaced_data * 255, 0, 255 - 1)  # Normalize to 0-255 range
    spaced_data[spaced_data0 == 256] = 256  # Ensure missing data stays at 256

    # Normalize data for colormap
    if ax is None:
        fig, ax = plt.subplots(figsize=(15, .5*num_cell_lines))
        
    cax = ax.imshow(spaced_data, aspect='auto', cmap=extended_cmap, interpolation='nearest', vmin=0, vmax=255)

    # Add color bar to the right and match height of the bars on the left
    if alld:
        cbar = fig.colorbar(cax, orientation='vertical', pad=0.02)
        cbar.set_label('Normalized error' if alld else None)
        cbar.set_ticks(np.linspace(0, 255, 6))
        cbar.set_ticklabels(np.round(np.linspace(0, 1, 6), 2))

    # Set ticks and labels
    yticks_positions = np.arange(1, num_cell_lines * 4, 4)
    ax.set_yticks(yticks_positions if alld else [])
    ax.set_yticklabels(cell_lines if alld else [])
    xtick_positions = np.arange(0, num_positions, 20000)
    xtick_labels = (xtick_positions / 1000).astype(int)
    ax.set_xticks(xtick_positions if alld else [])
    ax.set_xticklabels(xtick_labels if alld else [])
    ax.set_xlabel('Chromosome position (Mb)' if alld else None)

    ax.grid(False)

    # Add a gray square and text for "Missing data" in the top right corner
    if alld:
        ax.text(0, 1.04, f'Chromosome {chr_number}', transform=ax.transAxes, fontsize=7, verticalalignment='top', horizontalalignment='left', bbox=dict(facecolor="none", edgecolor='none'))
        ax.text(1, 1.04, 'Missing data', transform=ax.transAxes, fontsize=7, verticalalignment='top', horizontalalignment='right', bbox=dict(facecolor="none", edgecolor='none'))
        ax.add_patch(plt.Rectangle((.924, 1.02), 0.008, 0.02, transform=ax.transAxes, color=[0., 0., 0., 0.17], clip_on=False))

    if saveQ:
        plt.savefig(f'figures/fig_goodness_of_fit_chr[{chr_number}].pdf', bbox_inches='tight', transparent=True)

    if ax is None:
        plt.show()

#### Fork directionality plots ####

In [54]:
def forkd_plotf(cell_line, chr_number, chrpos_min, chrpos_max, file_name, spec_fileQ, saveQ, ax=None, show_ticks=True, show_title=True):
    global time_data

    # Data loading (Warning: requires saving data in fitting procedure)
    # Choose between whole-genome files or particular simulation
    if spec_fileQ:
        forkd_data = np.loadtxt(f'data/whole-genome_fork_directionality/fork_directionality_{cell_line}_chr[{chr_number}]_{chrpos_min}-{chrpos_max}.txt', dtype=float)
    else:
        forkd_data = np.loadtxt(f'data/whole-genome_fork_directionality/fork_directionality_{cell_line}_chr[{chr_number}].txt', dtype=float)[chrpos_min:chrpos_max]
    x = np.linspace(chrpos_min, chrpos_max, chrpos_max - chrpos_min)  # Chromosome positions

    
    # Plotting
    if ax is None:
        plt.figure(figsize=(10, 6))
        ax = plt.gca()
        
    ax.plot(x, forkd_data, label='', color='black', linewidth=2, alpha=0.6)
    if show_title:
        ax.set_title(cell_line + ' - Chromosome ' + str(chr_number))
    ax.set_xlabel('Chromosome position (kb)' if show_title else None)
    ax.set_ylabel('Fork directionality' if show_title else "Fork dir.")
    ax.set_ylim(-1, 1)
    ax.set_xlim(chrpos_min, chrpos_max)  # Ensure the x-axis covers the full range
    #ax.legend(loc='lower right')<s
    ax.grid(True)
    ax.grid(False)
    ax.tick_params(axis='both', which='both', direction='out', bottom=show_ticks, labelbottom=show_ticks, left=show_ticks, labelleft=show_ticks)
    for spine in ax.spines.values():
        spine.set_visible(True)
    
    # Save plot
    if saveQ:
        plt.savefig('figures/plot_forkd_' + file_name + '.pdf', bbox_inches='tight', transparent=True)

    if ax is None:
        plt.show()

#### Ideogram plots

In [56]:
def show_genome(cell_line, chr_number, chrpos_min, chrpos_max,
                show_genes_allQ=True, show_genes_bandsQ=True, show_genesQ=False,
                show_rt_plotQ=True, show_fire_plotQ=True, show_forkd_plotQ=False, show_error_plotQ=True, show_error_heat_plotQ=True,
                show_axisQ=True):
    chrom = f"chr{chr_number}"
    start = chrpos_min * 1000
    end = chrpos_max * 1000

    chr_lengths = [249251, 243200, 198023, 191155, 180916, 171116, 159139, 146365, 141214, 135535, 135007, 133852, 115170, 107350, 102532, 90355, 81196, 78078, 59129, 63026, 48130, 51305]

    fig = plt.figure(figsize=(10, 10), dpi=150)

    fig.suptitle(f"{cell_line} - Chromosome {chr_number}")

    height_ratios0 = [0.1, 0.1, 0.05, .2, .4, .2, .2, .2, .1, 0., 1]
    show_components = [show_genes_allQ, show_genes_allQ, show_genes_bandsQ, show_genesQ, show_rt_plotQ, show_fire_plotQ, show_forkd_plotQ, show_error_plotQ, show_error_heat_plotQ, show_axisQ, True]
    height_ratios = [height_ratios0[i] for i in range(len(height_ratios0)) if show_components[i]]

    gs_i = -1

    gs = fig.add_gridspec(
        nrows=len(height_ratios),
        ncols=3,
        width_ratios=[2, 20, 2],
        height_ratios=height_ratios,
        hspace=0.01,
        left=0.01,
        right=0.99,
        top=0.94,
    )

    # Add the full chromosome ideogram
    if show_genes_allQ:
        gs_i += 1
        all_chrom_ax = fig.add_subplot(gs[gs_i, 1])
        all_chrom_ax.axis("off")
        all_chrom_ax.set_xticks([])
        pyideogram.ideogramh(chrom, ax=all_chrom_ax)
        
        # Draw a red rectangle on top of the region depicted in the full chromosome ideogram
        rect_start = start  # Start position in base pairs
        rect_end = end  # End position in base pairs
        rectangle_thickness = 3  # Set the thickness of the rectangle outline
        all_chrom_ax.add_patch(plt.Rectangle((rect_start, -0.5), rect_end - rect_start, 1, edgecolor='red', facecolor='none', linewidth=rectangle_thickness, clip_on=False))
        
        # Draw lines from the bottom corners of the rectangle
        #all_chrom_ax.plot([rect_start, -chr_lengths[chr_number-1] * 1e3 * (1/5.25)], [-0.5, -2], color='black', linewidth=1, linestyle='-', clip_on=False)
        #all_chrom_ax.plot([rect_end, chr_lengths[chr_number-1] * 1e3+chr_lengths[chr_number-1] * 1e3 * (1/5.3)], [-0.5, -2], color='black', linewidth=1, linestyle='-', clip_on=False)

        gs_i += 1
        empty_ax = fig.add_subplot(gs[gs_i, :])
        empty_ax.axis("off")
        empty_ax.tick_params(labelbottom=False)
        #empty_ax.plot([rect_start, -chr_lengths[chr_number-1] * 1e3 * (1/5.25)], [-0.5, -2], color='black', linewidth=1, linestyle='-', clip_on=False)

    # Add the zoomed ideogram within the same interval
    if show_genes_bandsQ:
        gs_i += 1
        interval_ideogram_ax = fig.add_subplot(gs[gs_i, :])
        interval_ideogram_ax.axis("off")
        interval_ideogram_ax.set_xlim(start, end)
        interval_ideogram_ax.set_xticks([])
        pyideogram.ideogramh(chrom, ax=interval_ideogram_ax, names=True)
        zoom_ax = interval_ideogram_ax

    # Add the gene track plot
    if show_genesQ:
        gs_i += 1
        gene_track_ax = fig.add_subplot(gs[gs_i, :])
        pyideogram.genetrack(
            f"{chrom}:{start}-{end}",
            ax=gene_track_ax,
            textlane=True,
            transcriptstyle="arrowed",
            exonstyle="Box",
        )
        # Set genome ticks on the gene track plot
        pyideogram.set_genome_xticks(gene_track_ax)
        # Remove axis labels and ticks for the gene track plot
        gene_track_ax.axis("on")
        gene_track_ax.set_xticks([])
        gene_track_ax.set_ylabel('Genes')
        gene_track_ax.tick_params(labelbottom=False)
        if not show_genes_bandsQ:
            zoom_ax = gene_track_ax

    # Add RT plot
    if show_rt_plotQ:
        gs_i += 1
        rt_ax = fig.add_subplot(gs[gs_i, :])
        rt_plotf(cell_line, chr_number, chrpos_min, chrpos_max, 6, 'example_file', False, False, ax=rt_ax, show_ticks=False, show_title=False)

    # Add firing plot
    if show_fire_plotQ:
        gs_i += 1
        fire_ax = fig.add_subplot(gs[gs_i, :])
        fire_plotf2(cell_line, chr_number, chrpos_min, chrpos_max, 'example_file', False, ax=fire_ax, show_ticks=False, show_title=False)

    # Add fork directionality plot
    if show_forkd_plotQ:
        gs_i += 1
        forkd_ax = fig.add_subplot(gs[gs_i, :])
        forkd_plotf(cell_line, chr_number, chrpos_min, chrpos_max, 'example_file', False, False, ax=forkd_ax, show_ticks=False, show_title=False)

    # Add the error plot
    if show_error_plotQ:
        gs_i += 1
        error_ax = fig.add_subplot(gs[gs_i, :])
        error_plotf(cell_line, chr_number, chrpos_min, chrpos_max, 'example_file', False, ax=error_ax, show_ticks=False, show_title=False)
    
    # Add the error heat plot
    if show_error_heat_plotQ:
        gs_i += 1
        error_heat_ax = fig.add_subplot(gs[gs_i, :])
        error_heat_ax.set_ylabel('Error')
        plot_goodness_of_fit(chr_number, False, ax=error_heat_ax, cell_lines=[cell_line], chrpos_min=chrpos_min, chrpos_max=chrpos_max, alld=False)

    # Add a new row showing just the x-axis with the ticks
    if show_axisQ:
        gs_i += 1
        x_ticks_ax = fig.add_subplot(gs[gs_i, :])
        x_ticks_ax.set_xlim(start, end)
        x_ticks_ax.xaxis.set_visible(True)
        x_ticks_ax.spines['top'].set_visible(False)
        x_ticks_ax.spines['left'].set_visible(False)
        x_ticks_ax.spines['right'].set_visible(False)
        x_ticks_ax.tick_params(axis='x', which='both', bottom=True, top=False, labelbottom=True)
        # Hide y-axis and labels
        x_ticks_ax.tick_params(left=False, labelleft=False)
        x_ticks_ax.set_yticks([])
        # Set a dummy plot for ticks visibility
        x_ticks_ax.plot([start, end], [0, 0], color='white', alpha=0)  # Ensure the axis is plotted correctly
    
        # Determine tick step and format
        tick_step = (end - start) // 5
        if tick_step < 1:
            tick_step = 1
        tick_labels = []
        tick_positions = []
        for x in range(start, end + tick_step, tick_step):
            tick_positions.append(x)
            if (end - start) < 10000000:  # Less than 10 Mb range
                tick_labels.append(f'{x / 10**6:.1f}'.rstrip('0').rstrip('.'))
            else:
                tick_labels.append(f'{x / 10**6:.0f}')
        
        x_ticks_ax.set_xticks(tick_positions)
        x_ticks_ax.set_xticklabels(tick_labels)
        
        # Add a label to the x-axis
        x_ticks_ax.set_xlabel('Chromosome position (Mb)')
        
        # Add extra ticks
        minor_tick_step = tick_step // 5
        if minor_tick_step > 0:
            minor_ticks = []
            for x in range(start, end, minor_tick_step):
                if x not in tick_positions:
                    minor_ticks.append(x)
            x_ticks_ax.set_xticks(minor_ticks, minor=True)
            x_ticks_ax.tick_params(axis='x', which='minor', bottom=True, top=False, labelbottom=False)



    # Zoom the ideogram
    if show_genes_allQ:
        pyideogram.zoom(zoom_ax, all_chrom_ax)

    plt.show()