In [None]:
#| default_exp normalization

## Within-condition normalization
It is common practice and highly recommended to measure multiple samples of a given condition. This ensures that observed changes between conditions are not just due to random variation. Examples of samples within the same condition could be biological replicates, but also patients with the same clinical condition. 
We want to ensure that systematic changes between within-condition samples are corrected for as follows:

* Our assumed input values are log2 transformed peptide ion intensities, which are stored in a 2d numpy array called "samples". Each row in samples represents a peptide and each column represents a sample

* In a first step, we determine the all pairwise distances between the samples (details explained below)
* We then choose the pair of samples with the closest distance between each other
* We randomly choose one "anchor" sample and one "shift" sample and we subtract the distance between the samples from each peptide intensity measured in the "shift" sample. This is equivalent to rescaling the intensities of the original sample by a constant factor such that the distributions are aligned
* We then construct a virtual "merged" sample by computing the average intensities of anchor and shift sample
* We repeat the steps above until all samples are merged. Keeping track of the shift factors allows us then to determine an ideal shift for each sample



### Find the best matching pair
Take all pairs of the columns in the "samples" array that have not been already merged and compute the distance between the pairs as follows:
* Subtract sample1 from sample2 (or sample2 from sample1, the order does not matter)
* This results in a distribution of differences. As the samples array contains log2 intensities, this corresponds to taking log2 fold changes
* Take the median of the distribution, this is a good approximation for the change between the two distributions
* Select the two samples with the lowest absolute change

### Shifting samples
When we have computed the distance between two samples, we want to correct one of the samples by this distance. This results in two distributions with the same median value. We always shift the sample which has been merged from fewer distributions (see below for details). The sample to which the shift is applied is call "shift" sample and the sample which is not shifted is called "anchor" sample.
A "total shift" is calculated after all samples are merged, just by following up how many shifts have been applied to a sample in total

### Merging distributions
After we shift two distributions on top of each other, we calculate a "merged" distribution. Each intensity in the merged distribution is the average of the intensity in both distributions. For the merging we have to take into account the following: If for example the anchor sample has already been merged from 10 samples, and the shift distribution has not been merged at all, we want to weigh the distribution coming from many samples higher. We hence multiply each sample by the number of merges.

## Wrapper functions

## Shift linear to reference

## Unit tests

In [None]:
#| include: false
import numpy as np
import directlfq.normalization as lfq_norm

def test_merged_distribs():
    anchor_distrib = np.array([1, 1, 1, 1, 1])
    shift_distrib = np.array([2, 2, 2, 2, 2])
    counts_anchor_distrib = 4
    counts_shifted_distib = 1
    assert (lfq_norm.merge_distribs(anchor_distrib, shift_distrib, counts_anchor_distrib, counts_shifted_distib)== np.array([1.2, 1.2, 1.2, 1.2, 1.2])).any()


test_merged_distribs() 


In [None]:
#| include: false
import numpy as np
import pandas as pd
import directlfq.normalization as lfq_norm


def test_order_of_shifts():
    vals1 = [1, np.nan, 1.5]
    vals2 = [1, 1, np.nan]
    vals3 = [3.2, 1, 2.8]
    vals4 = [4.2, 2, 3.8]
    list_of_vals = [vals1, vals2, vals3, vals4]
    protein_profile_df = create_input_df_from_input_vals(list_of_vals)
    display(protein_profile_df)
    protein_profile_numpy = protein_profile_df.to_numpy()
    sample2shift = lfq_norm.get_normfacts(protein_profile_numpy)
    assert sample2shift == {1: 0.0, 2: -1.2999999999999998, 3: -2.3}
    print(lfq_norm.create_distance_matrix(protein_profile_numpy, metric = 'variance'))
    
    df_normed = pd.DataFrame(lfq_norm.apply_sampleshifts(protein_profile_numpy, sample2shift), index = protein_profile_df.index, columns = protein_profile_df.columns)
    display(df_normed)

def create_input_df_from_input_vals(list_of_vals):
    index_vals = [("A", f"ion{x}") for x in range(len(list_of_vals))]
    index = pd.Index(index_vals, name=('protein', 'ion'))
    return pd.DataFrame(list_of_vals, index=index)


In [None]:

test_order_of_shifts()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import directlfq.normalization as lfq_norm

def generate_randarrays(number_arrays,size_of_array):
    randarray = []
    for i in range(number_arrays):
        shift = np.random.uniform(low=-10, high=+10)
        randarray.append(np.random.normal(loc=shift, size=size_of_array))
    return np.array(randarray)

def test_sampleshift(samples):
    num_samples = samples.shape[0]
    merged_sample = []
    for i in range(num_samples):
        plt.hist(samples[i])
        merged_sample.extend(samples[i])
    stdev = np.std(merged_sample)
    print(f"STDev {stdev}")
    assert (stdev <=1.2) 
    
    plt.show()
randarray = generate_randarrays(5, 1000)
sample2shift = lfq_norm.get_normfacts(randarray)
normalized_randarray = lfq_norm.apply_sampleshifts(randarray, sample2shift)
test_sampleshift(normalized_randarray)

In [None]:
#| include: false
import directlfq.visualizations as lfq_viz
import directlfq.utils as lfq_utils
import directlfq.normalization as lfq_norm

def test_normalizing_between_samples(num_samples_quadratic):
    input_file = "../test_data/unit_tests/protein_normalization/peptides.txt.maxquant_peptides_benchmarking.aq_reformat.tsv"
    input_df = pd.read_csv(input_file, sep = '\t')
    input_df = lfq_utils.index_and_log_transform_input_df(input_df)
    input_df = input_df[[x for x in input_df.columns if "Shotgun" in x]]
    lfq_viz.plot_withincond_fcs(input_df)
    input_df_normalized = lfq_norm.NormalizationManagerSamples(input_df, num_samples_quadratic=num_samples_quadratic).complete_dataframe
    lfq_viz.plot_withincond_fcs(input_df_normalized)
    assert_that_results_scatter_around_zero(input_df_normalized)


def assert_that_results_scatter_around_zero(input_df_normalized):
    median_intensities = input_df_normalized.median(axis=1)
    input_df_subtracted = input_df_normalized.subtract(median_intensities, axis=0)
    median_of_medians = input_df_subtracted.median(axis=0)
    assert (median_of_medians < 0.1).all()
    print("checked that close to zero")


In [None]:

test_normalizing_between_samples(100)
test_normalizing_between_samples(3)
test_normalizing_between_samples(1)

In [None]:
import directlfq.normalization as lfq_norm
import directlfq.test_utils as lfq_test_utils
import numpy as np

def test_that_profiles_without_noise_are_shifted_exactly_on_top_of_each_other():
    peptide1= lfq_test_utils.PeptideProfile(protein_name="protA", fraction_zeros_in_profile=0.1, systematic_peptide_shift=3000, add_noise=False)
    peptide2= lfq_test_utils.PeptideProfile(protein_name="protA",fraction_zeros_in_profile=0.9, systematic_peptide_shift=3, add_noise=False)
    peptide3= lfq_test_utils.PeptideProfile(protein_name="protA", fraction_zeros_in_profile=0.1, systematic_peptide_shift=0.1, add_noise=False)
    peptide4= lfq_test_utils.PeptideProfile(protein_name="protA",fraction_zeros_in_profile=0.9, systematic_peptide_shift=100, add_noise=False)
    protein_df = lfq_test_utils.ProteinProfileGenerator([peptide1, peptide2, peptide3, peptide4]).protein_profile_dataframe
    display(protein_df)
    normed_ion_profile = lfq_norm.normalize_ion_profiles(protein_df)
    display(normed_ion_profile)
    column_from_shifted = normed_ion_profile.iloc[:,11].dropna().to_numpy()
    display(column_from_shifted)
    assert np.allclose(column_from_shifted, column_from_shifted[0])

def test_that_profiles_with_noise_are_close():
    peptide1= lfq_test_utils.PeptideProfile(protein_name="protA", fraction_zeros_in_profile=0, systematic_peptide_shift=3000, add_noise=True)
    peptide2= lfq_test_utils.PeptideProfile(protein_name="protA",fraction_zeros_in_profile=0, systematic_peptide_shift=3, add_noise=True)
    peptide3= lfq_test_utils.PeptideProfile(protein_name="protA", fraction_zeros_in_profile=0, systematic_peptide_shift=0.1, add_noise=True)
    peptide4= lfq_test_utils.PeptideProfile(protein_name="protA",fraction_zeros_in_profile=0, systematic_peptide_shift=100, add_noise=True)

    protein_df = lfq_test_utils.ProteinProfileGenerator([peptide1, peptide2, peptide3, peptide4]).protein_profile_dataframe
    display(protein_df)
    
    normed_ion_profile = lfq_norm.normalize_ion_profiles(protein_df)
    display(normed_ion_profile)
    column_from_shifted = normed_ion_profile.iloc[:,9].dropna().to_numpy()

    assert np.allclose(column_from_shifted, column_from_shifted[0],rtol=0.01, atol=0.01)

test_that_profiles_without_noise_are_shifted_exactly_on_top_of_each_other()
test_that_profiles_with_noise_are_close()


In [None]:
import directlfq.normalization as lfq_norm
import numpy as np

def _calc_distance(samples_1, samples_2):
    distrib = lfq_norm.get_fcdistrib(samples_1, samples_2)
    is_all_nan = np.all(np.isnan(distrib))
    if is_all_nan:
        return np.nan
    else:
        return np.nanmedian(distrib)

def test_calc_distance():
    print("Test 2: One array is entirely NaN")
    samples_1 = np.array([np.nan, np.nan, np.nan])
    samples_2 = np.array([1, 2, 3])
    assert np.isnan(lfq_norm.SampleShifterLinear._calc_distance(samples_1, samples_2)) == True, "Test 2 failed: Expected NaN for an array entirely of NaNs"
    

    print("Test 1: Both arrays are non-NaN and identical")
    samples_1 = np.array([1, 2, 3])
    samples_2 = np.array([1, 2, 3])
    assert np.isnan(_calc_distance(samples_1, samples_2)) == False, "Test 1 failed: Expected a non-NaN result for identical arrays"
    assert lfq_norm.SampleShifterLinear._calc_distance(samples_1, samples_2) == 0, "Test 1 failed: Expected a distance of 0 for identical arrays"
    

    print("Test 3: Arrays with some NaN values")
    samples_1 = np.array([1, np.nan, 3])
    samples_2 = np.array([13, 2, np.nan])
    assert np.isnan(_calc_distance(samples_1, samples_2)) == False, "Test 3 failed: Expected a valid number even with some NaNs"
    assert lfq_norm.SampleShifterLinear._calc_distance(samples_1, samples_2) == -12 , "Test 3 failed: Expected a distance of -12"
    
    print("Test 4: Arrays with different values but no NaNs")
    samples_1 = np.array([1, 4, 7])
    samples_2 = np.array([2, 5, 8])
    assert lfq_norm.SampleShifterLinear._calc_distance(samples_1, samples_2) != 0, "Test 4 failed: Expected a non-zero distance for different values"
    
    print("Test 5: Empty arrays")
    samples_1 = np.array([])
    samples_2 = np.array([])
    assert np.isnan(lfq_norm.SampleShifterLinear._calc_distance(samples_1, samples_2)) == True, "Test 5 failed: Expected NaN for empty arrays"

    print("All tests passed!")

# Run the test function
test_calc_distance()

In [None]:
import pandas as pd
import numpy as np
import directlfq.utils as lfq_utils
import directlfq.normalization as lfq_norm

import pandas as pd
import numpy as np

# Set the number of samples and proteins
num_samples = 200
num_rows = 1000  # Example number of rows; adjust as necessary

# Define column names for samples
sample_columns = [f'sample_{i}' for i in range(1, num_samples + 1)]

# Create the DataFrame with protein and ion columns first
df = pd.DataFrame(index=range(num_rows))
df['protein'] = ['protein_' + str(i) for i in np.random.randint(1, 100, size=num_rows)]  # Random protein tags
df['ion'] = ['ion_' + str(i) for i in range(1, num_rows + 1)]  # Unique ion tags

# Add sample columns
for col in sample_columns:
    df[col] = np.random.normal(loc= 50,size=num_rows)  # Populate with log-normal distribution
    
    # Determine the fraction of zero values for the column (only for the lower half)
    fraction_zeros = np.random.uniform(0.1, 0.9)
    lower_half_indices = df.index[num_rows // 2:]  # Identifying the lower half of the DataFrame
    zero_indices = np.random.choice(lower_half_indices, int(fraction_zeros * len(lower_half_indices)), replace=False)
    df.loc[zero_indices, col] = 0
    
    # Multiply the column by a uniformly drawn factor between 1 and 15
    multiplier = np.random.randint(10, 50)
    df[col] *= multiplier




df = lfq_utils.index_and_log_transform_input_df(df)
display(df)

df_normalized = lfq_norm.NormalizationManagerSamplesOnSelectedProteins(df.copy(), num_samples_quadratic=20).complete_dataframe

display(df)

In [None]:
import matplotlib.pyplot as plt
df_normalized_nona = df_normalized.dropna()
display(df_normalized_nona)
display(df.dropna())

# Plot a boxplot for each 'sample_x' column
plt.figure(figsize=(20, 10))  # Adjust the size as needed
df_normalized_nona.boxplot(column=[f'sample_{i}' for i in range(1, num_samples + 1)])
plt.xticks(rotation=90)  # Rotating the x labels for better readability
plt.title('Boxplot of Log-transformed Intensities for Each Sample Column')
plt.ylabel('Log2 Intensity')
plt.show()


# Plot a boxplot for each 'sample_x' column
plt.figure(figsize=(20, 10))  # Adjust the size as needed
df.dropna().boxplot(column=[f'sample_{i}' for i in range(1, num_samples + 1)])
plt.xticks(rotation=90)  # Rotating the x labels for better readability
plt.title('Boxplot of Log-transformed Intensities for Each Sample Column')
plt.ylabel('Log2 Intensity')
plt.show()



## Learning Tests

In [None]:
import numpy as np
import seaborn as sns

def test_taking_the_mean_along_an_axis():

    example_set = sns.load_dataset("iris").set_index("species")

    example_mean = example_set.mean(axis=1)

    assert example_mean.to_numpy()[3] == np.mean([4.6, 3.1, 1.5, 0.2])


In [None]:

test_taking_the_mean_along_an_axis()